package com.sdg.statistics

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}

object StatisticsRecommender {

  val MONGO_URI: String = "mongodb://vm2:27017/recom3"

  val MONGODB_DATABASE: String = "recom3"

  val MONGODB_RATING_COLLECTION = "Rating"

  val MONGO_DRVIVE_CLASS = "com.mongodb.spark.sql";

  val MONGODB_MOVIE_COLLECTION = "Movie"

  //优质电影
  val MONGODB_RATE_MORE_MOVIES_COLLECTION = "RateMoreMovies"
  //热门电影
  val MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION = "RateMoreMoviesRecently"
  //平均评分
  val MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION = "AverageMoviesScore"
  //平均分topN
  val MONGODB_GENRES_TOP_MOVIES_COLLECTION = "GenresTopMovies"

  //统计所有历史数据中每个电影的评分个数
  def rateMore(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {
    //根据业务执行sqlma
    val rateMoreDF = spark.sql("select mid, count(mid) as count from ratings group by mid order by count desc")
    //把结果数据写入到mongodb对应的表中
    rateMoreDF
      .write
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_RATE_MORE_MOVIES_COLLECTION)
      .mode("overwrite")
      .format(MONGO_DRVIVE_CLASS)
      .save()

  }


  //热门电影
  def rateMoreRecently(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {

    val simpleDateFormat = new SimpleDateFormat("yyyyMM")
    //sparkSql自定义函数,用于将时间戳转化成年月的形式(乘上1000 是将秒为单位的转化成毫秒)
    spark.udf.register("changDate", (x: Long) => simpleDateFormat.format(new Date(x * 1000L)).toLong)
    //根据业务执行sql
    val yeahMonthOfRatings = spark.sql("select mid, uid, score, changDate(timestamp) as yeahmonth from ratings")
    //将上一步得到的df注册成表ymRatings
    yeahMonthOfRatings.createOrReplaceTempView("ymRatings")
    //根据业务执行sql
    val rateMoreRecentlyDF = spark.sql("select mid, count(mid) as count,yeahmonth from ymRatings group by yeahmonth,mid order by yeahmonth desc,count desc")
    //将我们的结果数据写入到mongo的RateMoreMoviesRecently表中
    rateMoreRecentlyDF
      .write
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION)
      .mode("overwrite")
      .format(MONGO_DRVIVE_CLASS)
      .save()
  }


  def averageMovieScore(spark: SparkSession, movies: Dataset[Movie])(implicit mongoConf: MongoConfig): Unit = {
    //求出每个电影的平均评分
    val averageMovieScoreDF = spark.sql("select mid, avg(score) as avg from ratings group by mid").cache()
    //把结果数据写入到mongo的AverageMoviesScore表中
    averageMovieScoreDF
      .write
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION)
      .mode("overwrite")
      .format(MONGO_DRVIVE_CLASS)
      .save()
    import spark.implicits._
    //电影里面所有的类别,使用list进行封装
    val genres = List("Action", "Adventure", "Animation", "Comedy", "Ccrime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery"
      , "Romance", "Science", "Tv", "Thriller", "War", "Western")
    //把电影里面的类别由list的类型转化成rdd的类型
    val genresRdd = spark.sparkContext.parallelize(genres)

    // 统计每种类别最热电影【每种类别中平均评分最高的10部电影】[电影id,平均分,类别]
    val moviesWithSocreDF = movies.join(averageMovieScoreDF, Seq("mid", "mid")).select("mid", "avg", "genres").cache()


    //类别.cartesian(电影数据集(含平均评分))
    val genresTopMovies01 = genresRdd.cartesian(moviesWithSocreDF.rdd).filter(x => {
      // println(x)
      x match {
        //包含的就留下,不包含的就去掉
        case (genres, row) => {
          row.getAs[String]("genres").toLowerCase().contains(genres.toLowerCase)
        }
      }
    })

    val genresTopMovies02 = genresTopMovies01.map {
      //对数据的格式进行一个调整
      case (genres, row) => {
        (genres, (row.getAs[Int]("mid"), row.getAs[Double]("avg")))
      }
    }
    val genresTopMovies03 = genresTopMovies02.groupByKey()
    genresTopMovies03.foreach(line => {
      println(line)
    })
    val genresTopMovies = genresTopMovies03.map {
      case (genres, items) => {
        GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).slice(0, 10).map(x => Recommendation(x._1, x._2)))
      }
    }.toDF


    //把结果数据写入到mongo的GenresTopMovies表中
    genresTopMovies
      .write
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_GENRES_TOP_MOVIES_COLLECTION)
      .mode("overwrite")
      .format(MONGO_DRVIVE_CLASS)
      .save()

  }

  def main(args: Array[String]): Unit = {

    //使用map封装参数
    val conf = Map("spark.cores" -> "local[2]",
      "mongo.uri" -> MONGO_URI,
      "mongo.db" -> MONGODB_DATABASE)


    //sparkconf
    val sparkConf = new SparkConf().setAppName("statisticsRecommender").setMaster(conf("spark.cores"))
    //sparkSeesion
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    //数据加载
    implicit val mongoConf = new MongoConfig(conf("mongo.uri"), conf("mongo.db"))
    //导入sparkSession的隐式转换  rdd=>dataframe
    import spark.implicits._
    //从mongo中读取评分数据集
    val ratings = spark.read
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format(MONGO_DRVIVE_CLASS)
      .load()
      .as[MoviesRating]
      .cache

    //从mongo中读取电影数据数据集
    val movies = spark.read
      .option("uri", mongoConf.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format(MONGO_DRVIVE_CLASS)
      .load()
      .as[Movie]
      .cache

    //把数据注册成view
    ratings.createOrReplaceTempView("ratings")
    //1.优质电影==>的评分个数最多的电影==>RateMoreMoveies
    rateMore(spark)
    //2.热门电影=>一个月内评分最多的电影==>RateMoreRecentlyMovies
    rateMoreRecently(spark)
    //3.电影的平均评分 ==>AverageMovies
    //4.每类电影topN   ==>GenresTopMovies
    averageMovieScore(spark, movies)

  }

}

/**
  * MongoDB 配置对象
  *
  * @param uri MongoDB连接地址
  * @param db  操作的MongoDB数据库
  */
case class MongoConfig(val uri: String, val db: String)

/**
  * Rating Class 电影的评分类
  *
  * @param uid       用户的ID
  * @param mid       电影的ID
  * @param score     用户为该电影的评分
  * @param timestamp 用户为该电影评分的时间
  */
case class MoviesRating(val uid: Int, val mid: Int, val score: Double, val timestamp: Int)

/**
  * Movie Class 电影类
  *
  * @param mid       电影的ID
  * @param name      电影的名称
  * @param descri    电影的描述
  * @param timelong  电影的时长
  * @param issue     电影的发行时间
  * @param shoot     电影的拍摄时间
  * @param language  电影的语言
  * @param genres    电影的类别
  * @param actors    电影的演员
  * @param directors 电影的导演
  */
case class Movie(val mid: Int, val name: String, val descri: String, val timelong: String, val issue: String, val shoot: String, val language: String, val genres: String, val actors: String, val directors: String)

/**
  * 电影种类推荐样例类
  *
  * @param genres 电影种类
  * @param recs
  */
case class GenresRecommendation(genres: String, recs: Seq[Recommendation])


/**
  * 推荐项目
  *
  * @param rid 电影ID
  * @param r   推荐分数
  */
case class Recommendation(rid: Int, r: Double)

